package org.apache.flink.connector.jdbc.databases.phoenix.dialect;

import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.*;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class PhoenixDialect extends AbstractDialect {

    // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
    private static final int MAX_TIMESTAMP_PRECISION = 6;
    private static final int MIN_TIMESTAMP_PRECISION = 0;

    // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
    private static final int MAX_DECIMAL_PRECISION = 65;
    private static final int MIN_DECIMAL_PRECISION = 1;

    @Override
    public String getSelectFromStatement(
            String tableName, String[] selectFields, String[] conditionFields) {
        String[] dbAndTableName = tableName.split("\\.");

        String selectExpressions =
                Arrays.stream(selectFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String fieldExpressions =
                Arrays.stream(conditionFields)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(" AND "));
        return "SELECT "
                + selectExpressions
                + " FROM "
                + quoteIdentifier(dbAndTableName[0])
                + "."
                + quoteIdentifier(dbAndTableName[1])
                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
    }

    @Override
    public String getUpdateStatement(
            String tableName, String[] fieldNames, String[] conditionFields) {
        String[] dbAndTableName = tableName.split("\\.");
        String setClause =
                Arrays.stream(fieldNames)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(", "));
        String conditionClause =
                Arrays.stream(conditionFields)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(" AND "));
        return "UPDATE "
                + quoteIdentifier(dbAndTableName[0])
                + "."
                + quoteIdentifier(dbAndTableName[1])
                + " SET "
                + setClause
                + " WHERE "
                + conditionClause;
    }

    @Override
    public String getDeleteStatement(String tableName, String[] conditionFields) {
        String[] dbAndTableName = tableName.split("\\.");
        String conditionClause =
                Arrays.stream(conditionFields)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(" AND "));
        return "DELETE FROM "
                + quoteIdentifier(dbAndTableName[0])
                + "."
                + quoteIdentifier(dbAndTableName[1])
                + " WHERE "
                + conditionClause;
    }

    @Override
    public String dialectName() {
        return "Phoenix";
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new PhoenixRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long limit) {
        return "LIMIT " + limit;
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "\"" + identifier + "\"";
    }

    @Override
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.ofNullable(getInsertIntoStatement(tableName, fieldNames));
    }

    @Override
    public String getInsertIntoStatement(String tableName, String[] fieldNames) {

        String[] dbAndTableName = tableName.split("\\.");

        String columns =
                Arrays.stream(fieldNames)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String placeholders =
                Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
        return "UPSERT INTO "
                + quoteIdentifier(dbAndTableName[0])
                + "."
                + quoteIdentifier(dbAndTableName[1])
                + "("
                + columns
                + ")"
                + " VALUES ("
                + placeholders
                + ")";
    }

    @Override
    public String getRowExistsStatement(String tableName, String[] conditionFields) {
        String[] dbAndTableName = tableName.split("\\.");
        String fieldExpressions =
                Arrays.stream(conditionFields)
                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                        .collect(Collectors.joining(" AND "));
        return "SELECT 1 FROM "
                + quoteIdentifier(dbAndTableName[0])
                + "."
                + quoteIdentifier(dbAndTableName[1])
                + " WHERE "
                + fieldExpressions;
    }

    @Override
    public String appendDefaultUrlProperties(String url) {
        return super.appendDefaultUrlProperties(url);
    }

    @Override
    public Set<LogicalTypeRoot> supportedTypes() {
        return EnumSet.of(
                LogicalTypeRoot.CHAR,
                LogicalTypeRoot.VARCHAR,
                LogicalTypeRoot.BOOLEAN,
                LogicalTypeRoot.VARBINARY,
                LogicalTypeRoot.DECIMAL,
                LogicalTypeRoot.TINYINT,
                LogicalTypeRoot.SMALLINT,
                LogicalTypeRoot.INTEGER,
                LogicalTypeRoot.BIGINT,
                LogicalTypeRoot.FLOAT,
                LogicalTypeRoot.DOUBLE,
                LogicalTypeRoot.DATE,
                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
    }

    @Override
    public Optional<Range> timestampPrecisionRange() {
        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
    }

    @Override
    public Optional<Range> decimalPrecisionRange() {
        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
    }
}
